feat(OpenTelemetry/Transport): implement Kafka transport#196
feat(OpenTelemetry/Transport): implement Kafka transport#196GeoDpto wants to merge 1 commit intoFriendsOfOpenTelemetry:mainfrom
Conversation
ed4ff11 to
18ea244
Compare
18ea244 to
21c6f8c
Compare
gaelreyrol
left a comment
There was a problem hiding this comment.
Hi @GeoDpto, thank you very much for this pull request!
A few comments outside of your changes:
TransportFactoryTestwas not updated with Kafka test case.- Remove all the
declare(strict_types=1)usage.
| private function flushInternal(): bool | ||
| { | ||
| // librdkafka recommends retrying the flush operation a couple of times when it returns a null result. | ||
| $timeout = self::FLUSH_TIMEOUT; | ||
| $start = \microtime(true); | ||
| do { | ||
| $res = $this->producer->flush($timeout); | ||
| if (\RD_KAFKA_RESP_ERR_NO_ERROR === $res) { | ||
| return true; | ||
| } | ||
|
|
||
| // reduce timeout | ||
| $elapsedMs = (int) \round((\microtime(true) - $start) * 1000); | ||
| $timeout = \max(0, self::FLUSH_TIMEOUT - $elapsedMs); | ||
| } while ($timeout > 0); | ||
|
|
||
| return false; | ||
| } |
There was a problem hiding this comment.
This internal flush mechanic needs documentation, I have never used Kafka but I think we should provide incentives about how the transport is working under the hood.
Also, non-success errors seems to be treated equally which could hide permanent errors right?
| } | ||
|
|
||
| return new KafkaTransport($conf, $dsn->getHost()); | ||
| } |
There was a problem hiding this comment.
We should document this transformation.
| */ | ||
| final readonly class KafkaTransport implements TransportInterface | ||
| { | ||
| private const FLUSH_TIMEOUT = 10000; |
There was a problem hiding this comment.
While I don't have any problem with having a default value, I think this should be configurable from the dsn and documented.
| |-----------|-----------|---------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|--------------| | ||
| | http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A | | ||
| | grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A | | ||
| | kafka | otlp | OpenTelemetry exporter using the Kafka message broker. Add query parameters for configuring the message broker. | kafka+otlp://open_telemetry_local_alpha_traces?metadata.broker.list=kafka:9092 | N/A | |
There was a problem hiding this comment.
The open_telemetry_local_alpha_traces is ambiguous compared to the other examples, we should leave to a "standard" host:port example.
| if (!\class_exists(Conf::class)) { | ||
| self::markTestSkipped('rdkafka extension not available in the test environment.'); |
| if (TransportEnum::Kafka === $this->transport) { | ||
| return \sprintf('kafka://%s?%s', $this->dsn->getHost(), $this->dsn->getQuery()->toString()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Can you elaborate on this change?
| Conf $configuration, | ||
| string $topic, | ||
| ) { | ||
| if (!\class_exists(Conf::class)) { |
There was a problem hiding this comment.
This wont work because PHP resolves RdKafka\Conf before the check, so the friendly exception won't be thrown:
Error: Class "RdKafka\Conf" not found
| "ext-grpc": "*", | ||
| "ext-mbstring": "*", | ||
| "ext-opentelemetry": "*", | ||
| "ext-rdkafka": "*", |
There was a problem hiding this comment.
The extension should be suggested too.
| return new ErrorFuture($exception); | ||
| } | ||
|
|
||
| return new CompletedFuture(null); |
There was a problem hiding this comment.
What happens if an error occurs while queuing the payload, the new CompletedFuture(null); silences it.
Summary
Our project encountered performance bottlenecks due to synchronous communication between services and the OpenTelemetry Collector.
Problem
The synchronous transport introduced delays and reduced throughput when exporting telemetry data.
Solution
OpenTelemetry provides different transport protocols for exporting data. In production, it's recommended to integrate the OpenTelemetry Collector, which can consume Kafka topics using the Protobuf serialization, so we chose this option. This enables asynchronous communication between services.
This merge request implements the Kafka transport and factory providing possibility to process data asynchronously.
Changes